/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.client.solrj.io.stream;

import static org.apache.solr.client.solrj.io.stream.StreamAssert.assertMaps;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.comp.ComparatorOrder;
import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
import org.apache.solr.client.solrj.io.stream.metrics.CountMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.embedded.JettySolrRunner;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/**
 * All base tests will be done with CloudSolrStream. Under the covers CloudSolrStream uses
 * SolrStream so SolrStream will get fully exercised through these tests.
 */
@SolrTestCaseJ4.SuppressSSL
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
public class StreamingTest extends SolrCloudTestCase {

  public static final String COLLECTIONORALIAS = "streams";
  public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica";

  private static final StreamFactory streamFactory =
      new StreamFactory()
          .withFunctionName("search", CloudSolrStream.class)
          .withFunctionName("merge", MergeStream.class)
          .withFunctionName("unique", UniqueStream.class)
          .withFunctionName("top", RankStream.class)
          .withFunctionName("reduce", ReducerStream.class)
          .withFunctionName("group", GroupOperation.class)
          .withFunctionName("rollup", RollupStream.class)
          .withFunctionName("parallel", ParallelStream.class);

  private static String zkHost;

  private static int numWorkers;
  private static boolean useAlias;

  @BeforeClass
  public static void configureCluster() throws Exception {
    int numShards = random().nextInt(2) + 1; // 1 - 3
    numWorkers = numShards > 2 ? random().nextInt(numShards - 1) + 1 : numShards;
    configureCluster(numShards)
        .addConfig(
            "conf",
            getFile("solrj")
                .resolve("solr")
                .resolve("configsets")
                .resolve("streaming")
                .resolve("conf"))
        .configure();

    String collection;
    useAlias = random().nextBoolean();
    if (useAlias) {
      collection = COLLECTIONORALIAS + "_collection";
    } else {
      collection = COLLECTIONORALIAS;
    }
    CollectionAdminRequest.createCollection(collection, "conf", numShards, 1)
        .process(cluster.getSolrClient());
    cluster.waitForActiveCollection(collection, numShards, numShards);
    if (useAlias) {
      CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection)
          .process(cluster.getSolrClient());
    }

    zkHost = cluster.getZkServer().getZkAddress();
    streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);

    // Set up multi-replica collection
    if (useAlias) {
      collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection";
    } else {
      collection = MULTI_REPLICA_COLLECTIONORALIAS;
    }
    CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1)
        .process(cluster.getSolrClient());
    cluster.waitForActiveCollection(collection, numShards, numShards * 3);
    if (useAlias) {
      CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection)
          .process(cluster.getSolrClient());
    }
    streamFactory.withCollectionZkHost(MULTI_REPLICA_COLLECTIONORALIAS, zkHost);
  }

  private static final String id = "id";

  // Update request shared by many of the tests
  private final UpdateRequest helloDocsUpdateRequest =
      new UpdateRequest()
          .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
          .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
          .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
          .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
          .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
          .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6")
          .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7")
          .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8")
          .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9")
          .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10");

  @Before
  public void clearCollection() throws Exception {
    new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS);
  }

  @Test
  public void testUniqueStream() throws Exception {

    // Test CloudSolrStream and UniqueStream
    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {
      SolrParams sParams = params("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
      ustream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(ustream);
      assertEquals(4, tuples.size());
      assertOrder(tuples, 0, 1, 3, 4);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testSpacesInParams() throws Exception {

    SolrParams sParams =
        params("q", "*:*", "fl", "id , a_s , a_i , a_f", "sort", "a_f  asc , a_i  asc");

    // CloudSolrStream compares the values of the sort with the fl field.
    // The constructor will throw an exception if the sort fields do not the
    // a value in the field list.

    CloudSolrStream stream = new CloudSolrStream("", "collection1", sParams);
    stream.close();
  }

  @Test
  public void testNonePartitionKeys() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {

      SolrParams sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "id,a_s,a_i,a_f",
              "sort",
              "a_s asc,a_f asc",
              "partitionKeys",
              "none",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      ParallelStream pstream =
          parallelStream(stream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);

      assertEquals(tuples.size(), (10 * numWorkers)); // Each tuple will be double counted.
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelUniqueStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .add(id, "5", "a_s", "hello1", "a_i", "10", "a_f", "1")
        .add(id, "6", "a_s", "hello1", "a_i", "11", "a_f", "5")
        .add(id, "7", "a_s", "hello1", "a_i", "12", "a_f", "5")
        .add(id, "8", "a_s", "hello1", "a_i", "13", "a_f", "4")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {

      SolrParams sParams =
          params(
              "q",
              "*:*",
              "fl",
              "id,a_s,a_i,a_f",
              "sort",
              "a_f asc,a_i asc",
              "partitionKeys",
              "a_f",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      UniqueStream ustream = new UniqueStream(stream, new FieldEqualitor("a_f"));
      ParallelStream pstream =
          parallelStream(ustream, new FieldComparator("a_f", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);
      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 1, 3, 4, 6);

      // Test the eofTuples

      Map<String, Tuple> eofTuples = pstream.getEofTuples();
      assertEquals(numWorkers, eofTuples.size()); // There should be an EOF tuple for each worker.
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testMultipleFqClauses() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_ss", "hello0", "a_ss", "hello1", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_ss", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_ss", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_ss", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_ss", "hello1", "a_i", "1", "a_f", "1")
        .add(id, "5", "a_ss", "hello1", "a_i", "10", "a_f", "1")
        .add(id, "6", "a_ss", "hello1", "a_i", "11", "a_f", "5")
        .add(id, "7", "a_ss", "hello1", "a_i", "12", "a_f", "5")
        .add(id, "8", "a_ss", "hello1", "a_i", "13", "a_f", "4")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      ModifiableSolrParams params =
          new ModifiableSolrParams(
              params(
                  "q",
                  "*:*",
                  "fl",
                  "id,a_i",
                  "sort",
                  "a_i asc",
                  "fq",
                  "a_ss:hello0",
                  "fq",
                  "a_ss:hello1"));
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, params);
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);
      assertEquals("Multiple fq clauses should have been honored", 1, tuples.size());
      assertEquals("should only have gotten back document 0", "0", tuples.get(0).getString("id"));
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testRankStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {
      SolrParams sParams = params("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      RankStream rstream =
          new RankStream(stream, 3, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      rstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(rstream);
      assertEquals(3, tuples.size());
      assertOrder(tuples, 4, 3, 2);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelRankStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "5", "a_s", "hello1", "a_i", "5", "a_f", "1")
        .add(id, "6", "a_s", "hello1", "a_i", "6", "a_f", "1")
        .add(id, "7", "a_s", "hello1", "a_i", "7", "a_f", "1")
        .add(id, "8", "a_s", "hello1", "a_i", "8", "a_f", "1")
        .add(id, "9", "a_s", "hello1", "a_i", "9", "a_f", "1")
        .add(id, "10", "a_s", "hello1", "a_i", "10", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {
      SolrParams sParams =
          params(
              "q",
              "*:*",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i asc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      RankStream rstream =
          new RankStream(stream, 11, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      ParallelStream pstream =
          parallelStream(rstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);

      assertEquals(10, tuples.size());
      assertOrder(tuples, 10, 9, 8, 7, 6, 5, 4, 3, 2, 0);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testTrace() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test with spaces in the parameter lists.
      SolrParams sParamsA =
          params("q", "*:*", "fl", "id,a_s, a_i,a_f", "sort", "a_s asc,a_f   asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      stream.setTrace(true);
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);
      assertEquals(COLLECTIONORALIAS, tuples.get(0).get("_COLLECTION_"));
      assertEquals(COLLECTIONORALIAS, tuples.get(1).get("_COLLECTION_"));
      assertEquals(COLLECTIONORALIAS, tuples.get(2).get("_COLLECTION_"));
      assertEquals(COLLECTIONORALIAS, tuples.get(3).get("_COLLECTION_"));
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testReducerStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test with spaces in the parameter lists.
      SolrParams sParamsA =
          params("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      ReducerStream rstream =
          new ReducerStream(
              stream,
              new FieldEqualitor("a_s"),
              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));

      rstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(rstream);

      assertEquals(3, tuples.size());

      Tuple t0 = tuples.get(0);
      List<Map<?, ?>> maps0 = t0.getMaps("group");
      assertMaps(maps0, 0, 2, 1, 9);

      Tuple t1 = tuples.get(1);
      List<Map<?, ?>> maps1 = t1.getMaps("group");
      assertMaps(maps1, 3, 5, 7, 8);

      Tuple t2 = tuples.get(2);
      List<Map<?, ?>> maps2 = t2.getMaps("group");
      assertMaps(maps2, 4, 6);

      // Test with spaces in the parameter lists using a comparator
      sParamsA = params("q", "*:*", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      rstream =
          new ReducerStream(
              stream,
              new FieldComparator("a_s", ComparatorOrder.ASCENDING),
              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
      rstream.setStreamContext(streamContext);
      tuples = getTuples(rstream);

      assertEquals(3, tuples.size());

      t0 = tuples.get(0);
      maps0 = t0.getMaps("group");
      assertMaps(maps0, 9, 1, 2, 0);

      t1 = tuples.get(1);
      maps1 = t1.getMaps("group");
      assertMaps(maps1, 8, 7, 5, 3);

      t2 = tuples.get(2);
      maps2 = t2.getMaps("group");
      assertMaps(maps2, 6, 4);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testZeroReducerStream() throws Exception {

    // Gracefully handle zero results
    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test with spaces in the parameter lists.
      SolrParams sParamsA =
          params("q", "blah", "fl", "id,a_s, a_i,  a_f", "sort", "a_s asc  ,  a_f   asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      ReducerStream rstream =
          new ReducerStream(
              stream,
              new FieldEqualitor("a_s"),
              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 5));
      rstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(rstream);

      assertEquals(0, tuples.size());
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelReducerStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "id,a_s,a_i,a_f",
              "sort",
              "a_s asc,a_f asc",
              "partitionKeys",
              "a_s",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      ReducerStream rstream =
          new ReducerStream(
              stream,
              new FieldEqualitor("a_s"),
              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.DESCENDING), 5));
      ParallelStream pstream =
          parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);

      assertEquals(3, tuples.size());

      Tuple t0 = tuples.get(0);
      List<Map<?, ?>> maps0 = t0.getMaps("group");
      assertMaps(maps0, 9, 1, 2, 0);

      Tuple t1 = tuples.get(1);
      List<Map<?, ?>> maps1 = t1.getMaps("group");
      assertMaps(maps1, 8, 7, 5, 3);

      Tuple t2 = tuples.get(2);
      List<Map<?, ?>> maps2 = t2.getMaps("group");
      assertMaps(maps2, 6, 4);

      // Test Descending with Ascending subsort

      sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "id,a_s,a_i,a_f",
              "sort",
              "a_s desc,a_f asc",
              "partitionKeys",
              "a_s",
              "qt",
              "/export");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      rstream =
          new ReducerStream(
              stream,
              new FieldEqualitor("a_s"),
              new GroupOperation(new FieldComparator("a_f", ComparatorOrder.ASCENDING), 3));
      pstream = parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.DESCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      tuples = getTuples(pstream);

      assertEquals(3, tuples.size());

      t0 = tuples.get(0);
      maps0 = t0.getMaps("group");
      assertMaps(maps0, 4, 6);

      t1 = tuples.get(1);
      maps1 = t1.getMaps("group");
      assertMaps(maps1, 3, 5, 7);

      t2 = tuples.get(2);
      maps2 = t2.getMaps("group");
      assertMaps(maps2, 0, 2, 1);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testExceptionStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {
      // Test an error that originates from the /select handler
      SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      ExceptionStream estream = new ExceptionStream(stream);
      estream.setStreamContext(streamContext);
      Tuple t = getTuple(estream);
      assertTrue(t.EOF);
      assertTrue(t.EXCEPTION);
      assertTrue(t.getException().contains("sort param field can't be found: blah"));

      sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "wt", "javabin");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      estream = new ExceptionStream(stream);
      estream.setStreamContext(streamContext);
      t = getTuple(estream);
      assertTrue(t.EOF);
      assertTrue(t.EXCEPTION);
      assertTrue(t.getException().contains("sort param field can't be found: blah"));

      // Test an error that originates from the /export handler
      sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc", "qt", "/export");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      estream = new ExceptionStream(stream);
      estream.setStreamContext(streamContext);
      t = getTuple(estream);
      assertTrue(t.EOF);
      assertTrue(t.EXCEPTION);
      assertTrue(t.getException().contains("sort param field can't be found: blah"));

      // Test an error that originates from the /export handler
      sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "a_s,a_i,a_f,blah",
              "sort",
              "blah asc",
              "qt",
              "/export",
              "wt",
              "javabin");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      estream = new ExceptionStream(stream);
      estream.setStreamContext(streamContext);
      t = getTuple(estream);
      assertTrue(t.EOF);
      assertTrue(t.EXCEPTION);
      assertTrue(t.getException().contains("sort param field can't be found: blah"));
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  @Ignore
  public void testParallelExceptionStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,blah", "sort", "blah asc");
    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
    ParallelStream pstream =
        new ParallelStream(
            zkHost,
            COLLECTIONORALIAS,
            stream,
            2,
            new FieldComparator("blah", ComparatorOrder.ASCENDING));
    ExceptionStream estream = new ExceptionStream(pstream);
    Tuple t = getTuple(estream);
    assertTrue(t.EOF);
    assertTrue(t.EXCEPTION);
    // ParallelStream requires that partitionKeys be set.
    assertTrue(t.getException().contains("When numWorkers > 1 partitionKeys must be set."));

    // Test an error that originates from the /select handler
    sParamsA =
        params(
            "q",
            "*:*",
            "fl",
            "a_s,a_i,a_f,blah",
            "sort",
            "blah asc",
            "partitionKeys",
            "a_s",
            "qt",
            "/export");
    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
    pstream =
        new ParallelStream(
            zkHost,
            COLLECTIONORALIAS,
            stream,
            2,
            new FieldComparator("blah", ComparatorOrder.ASCENDING));
    estream = new ExceptionStream(pstream);
    t = getTuple(estream);
    assertTrue(t.EOF);
    assertTrue(t.EXCEPTION);
    assertTrue(t.getException().contains("sort param field can't be found: blah"));

    // Test an error that originates from the /export handler
    sParamsA =
        params(
            "q",
            "*:*",
            "fl",
            "a_s,a_i,a_f,score",
            "sort",
            "a_s asc",
            "qt",
            "/export",
            "partitionKeys",
            "a_s");
    stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
    pstream =
        new ParallelStream(
            zkHost,
            COLLECTIONORALIAS,
            stream,
            2,
            new FieldComparator("a_s", ComparatorOrder.ASCENDING));
    estream = new ExceptionStream(pstream);
    t = getTuple(estream);
    assertTrue(t.EOF);
    assertTrue(t.EXCEPTION);
    // The /export handler will pass through a real exception.
    assertTrue(t.getException().contains("undefined field:"));
  }

  @Test
  public void testStatsStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParamsA = params("q", "*:*");

      Metric[] metrics = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      StatsStream statsStream = new StatsStream(zkHost, COLLECTIONORALIAS, sParamsA, metrics);
      statsStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(statsStream);

      assertEquals(1, tuples.size());

      // Test Long and Double Sums

      Tuple tuple = tuples.get(0);

      Double sumi = tuple.getDouble("sum(a_i)");
      Double sumf = tuple.getDouble("sum(a_f)");
      Double mini = tuple.getDouble("min(a_i)");
      Double minf = tuple.getDouble("min(a_f)");
      Double maxi = tuple.getDouble("max(a_i)");
      Double maxf = tuple.getDouble("max(a_f)");
      Double avgi = tuple.getDouble("avg(a_i)");
      Double avgf = tuple.getDouble("avg(a_f)");
      Double count = tuple.getDouble("count(*)");

      assertEquals(70, sumi.longValue());
      assertEquals(55.0, sumf, 0.01);
      assertEquals(0.0, mini, 0.01);
      assertEquals(1.0, minf, 0.01);
      assertEquals(14.0, maxi, 0.01);
      assertEquals(10.0, maxf, 0.01);
      assertEquals(7.0, avgi, .01);
      assertEquals(5.5, avgf, .001);
      assertEquals(10, count, .01);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testFacetStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");

      Bucket[] buckets = {new Bucket("a_s")};

      Metric[] metrics = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      FieldComparator[] sorts = {new FieldComparator("sum(a_i)", ComparatorOrder.ASCENDING)};

      FacetStream facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);

      List<Tuple> tuples = getTuples(facetStream);

      assertEquals(3, tuples.size());

      // Test Long and Double Sums

      Tuple tuple = tuples.get(0);
      String bucket = tuple.getString("a_s");
      Double sumi = tuple.getDouble("sum(a_i)");
      Double sumf = tuple.getDouble("sum(a_f)");
      Double mini = tuple.getDouble("min(a_i)");
      Double minf = tuple.getDouble("min(a_f)");
      Double maxi = tuple.getDouble("max(a_i)");
      Double maxf = tuple.getDouble("max(a_f)");
      Double avgi = tuple.getDouble("avg(a_i)");
      Double avgf = tuple.getDouble("avg(a_f)");
      Double count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11.0, sumf, 0.01);
      assertEquals(4.0, mini, 0.01);
      assertEquals(4.0, minf, 0.01);
      assertEquals(11.0, maxi, 0.01);
      assertEquals(7.0, maxf, 0.01);
      assertEquals(7.5, avgi, 0.01);
      assertEquals(5.5, avgf, 0.01);
      assertEquals(2, count, 0.01);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, .01);
      assertEquals(18, sumf, .01);
      assertEquals(0.0, mini, .01);
      assertEquals(1.0, minf, .01);
      assertEquals(14.0, maxi, .01);
      assertEquals(10.0, maxf, .01);
      assertEquals(4.25, avgi, .01);
      assertEquals(4.5, avgf, .01);
      assertEquals(4, count, .01);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38.0, sumi, 0.01);
      assertEquals(26.0, sumf, 0.01);
      assertEquals(3.0, mini, 0.01);
      assertEquals(3.0, minf, 0.01);
      assertEquals(13.0, maxi, 0.01);
      assertEquals(9.0, maxf, 0.01);
      assertEquals(9.5, avgi, 0.01);
      assertEquals(6.5, avgf, 0.01);
      assertEquals(4, count, 0.01);

      // Reverse the Sort.

      sorts[0] = new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING);

      facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);

      tuples = getTuples(facetStream);

      assertEquals(3, tuples.size());

      // Test Long and Double Sums

      tuple = tuples.get(0);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38, sumi, 0.1);
      assertEquals(26, sumf, 0.1);
      assertEquals(3, mini, 0.1);
      assertEquals(3, minf, 0.1);
      assertEquals(13, maxi, 0.1);
      assertEquals(9, maxf, 0.1);
      assertEquals(9.5, avgi, 0.1);
      assertEquals(6.5, avgf, 0.1);
      assertEquals(4, count, 0.1);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, 0.01);
      assertEquals(18, sumf, 0.01);
      assertEquals(0, mini, 0.01);
      assertEquals(1, minf, 0.01);
      assertEquals(14, maxi, 0.01);
      assertEquals(10, maxf, 0.01);
      assertEquals(4.25, avgi, 0.01);
      assertEquals(4.5, avgf, 0.01);
      assertEquals(4, count, 0.01);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11, sumf, 0.01);
      assertEquals(4.0, mini, 0.01);
      assertEquals(4.0, minf, 0.01);
      assertEquals(11.0, maxi, 0.01);
      assertEquals(7.0, maxf, 0.01);
      assertEquals(7.5, avgi, 0.01);
      assertEquals(5.5, avgf, 0.01);
      assertEquals(2, count, 0.01);

      // Test index sort

      sorts[0] = new FieldComparator("a_s", ComparatorOrder.DESCENDING);

      facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
      facetStream.setStreamContext(streamContext);

      tuples = getTuples(facetStream);

      assertEquals(3, tuples.size());

      tuple = tuples.get(0);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11, sumf, 0.01);
      assertEquals(4, mini, 0.01);
      assertEquals(4, minf, 0.01);
      assertEquals(11, maxi, 0.01);
      assertEquals(7, maxf, 0.01);
      assertEquals(7.5, avgi, 0.01);
      assertEquals(5.5, avgf, 0.01);
      assertEquals(2, count, 0.01);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38.0D, sumi, 0.01);
      assertEquals(26.0D, sumf, 0.01);
      assertEquals(3.0D, mini, 0.01);
      assertEquals(3.0D, minf, 0.01);
      assertEquals(13.0D, maxi, 0.01);
      assertEquals(9.0D, maxf, 0.01);
      assertEquals(9.5D, avgi, 0.01);
      assertEquals(6.5D, avgf, 0.01);
      assertEquals(4, count, 0.01);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, 0.01);
      assertEquals(18, sumf, 0.01);
      assertEquals(0, mini, 0.01);
      assertEquals(1, minf, 0.01);
      assertEquals(14, maxi, 0.01);
      assertEquals(10, maxf, 0.01);
      assertEquals(4.25, avgi, 0.01);
      assertEquals(4.5, avgf, 0.01);
      assertEquals(4, count, 0.01);

      // Test index sort

      sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);

      facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
      facetStream.setStreamContext(streamContext);
      tuples = getTuples(facetStream);

      assertEquals(3, tuples.size());

      tuple = tuples.get(0);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, 0.01);
      assertEquals(18, sumf, 0.01);
      assertEquals(0, mini, 0.01);
      assertEquals(1, minf, 0.01);
      assertEquals(14, maxi, 0.01);
      assertEquals(10, maxf, 0.01);
      assertEquals(4.25, avgi, 0.0001);
      assertEquals(4.5, avgf, 0.001);
      assertEquals(4, count, 0.01);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38, sumi, 0.01);
      assertEquals(26, sumf, 0.01);
      assertEquals(3, mini, 0.01);
      assertEquals(3, minf, 0.01);
      assertEquals(13, maxi, 0.01);
      assertEquals(9, maxf, 0.01);
      assertEquals(9.5, avgi, 0.01);
      assertEquals(6.5, avgf, 0.01);
      assertEquals(4, count, 0.01);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11.0, sumf, 0.1);
      assertEquals(4.0, mini, 0.1);
      assertEquals(4.0, minf, 0.1);
      assertEquals(11.0, maxi, 0.1);
      assertEquals(7.0, maxf, 0.1);
      assertEquals(7.5, avgi, 0.1);
      assertEquals(5.5, avgf, 0.1);
      assertEquals(2, count, 0.1);

      sorts[0] = new FieldComparator("a_s", ComparatorOrder.ASCENDING);

      facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, -1);
      facetStream.setStreamContext(streamContext);
      tuples = getTuples(facetStream);

      assertEquals(3, tuples.size());

    } finally {
      solrClientCache.close();
    }
  }

  String[] docPairs(int base, String sSeq) {
    List<String> pairs = new ArrayList<>();
    final int iSeq = base * 100;
    pairs.add(id);
    pairs.add(sSeq + base); // aaa1
    pairs.add("s_sing");
    pairs.add(Integer.toString(iSeq + 1)); // 101
    pairs.add("i_sing");
    pairs.add(Integer.toString(iSeq + 2)); // 102
    pairs.add("f_sing");
    pairs.add(Float.toString(iSeq + 3)); // 103.0
    pairs.add("l_sing");
    pairs.add(Long.toString(iSeq + 4)); // 104
    pairs.add("d_sing");
    pairs.add(Double.toString(iSeq + 5)); // 105
    pairs.add("dt_sing");
    // Works as long as we add fewer than 60 docs
    pairs.add(String.format(Locale.ROOT, "2000-01-01T%02d:00:00Z", base));
    pairs.add("b_sing");
    pairs.add((base % 2) == 0 ? "T" : "F"); // Tricky

    String[] ret = new String[pairs.size()];
    return pairs.toArray(ret);
  }

  // Select and export should be identical sort orders I think.
  private void checkSort(JettySolrRunner jetty, String field, String sortDir, String[] fields)
      throws IOException {

    // Comes back after LUCENE-7548
    //    SolrQuery query = new SolrQuery("*:*");
    //    query.addSort(field, ("asc".equals(sortDir) ? SolrQuery.ORDER.asc :
    // SolrQuery.ORDER.desc));
    //    query.addSort("id", SolrQuery.ORDER.asc);
    //    query.addField("id");
    //    query.addField(field);
    //    query.setRequestHandler("standard");
    //    query.setRows(100);
    //
    //    List<String> selectOrder = new ArrayList<>();
    //
    //    String url = jetty.getBaseUrl() + "/" + COLLECTION;
    //
    //    try (HttpSolrClient client = getHttpSolrClient(url)) {
    //      client.setConnectionTimeout(DEFAULT_CONNECTION_TIMEOUT);
    //      QueryResponse rsp = client.query(query);
    //      for (SolrDocument doc : rsp.getResults()) {
    //        selectOrder.add((String) doc.getFieldValue("id"));
    //      }
    //    }
    //    SolrParams exportParams = params("q", "*:*", "qt", "/export", "fl", "id," + field, "sort",
    // field + " " + sortDir + ",id asc");
    //    try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS,
    // exportParams)) {
    //      List<Tuple> tuples = getTuples(solrStream);
    //      assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
    //      // Since the getTuples method doesn't return the EOF tuple, these two entries should be
    // the same size.
    //      assertEquals("Tuple count should exactly match sort array size for field " + field + "
    // sort order " + sortDir, selectOrder.size(), tuples.size());
    //
    //      for (int idx = 0; idx < selectOrder.size(); ++idx) { // Tuples should be in lock step
    // with the orders from select.
    //        assertEquals("Order for missing docValues fields wrong for field '" + field + "' sort
    // direction '" + sortDir,
    //            tuples.get(idx).getString("id"), selectOrder.get(idx));
    //      }
    //    }

    // Remove below and uncomment above after LUCENE-7548
    List<String> selectOrder =
        ("asc".equals(sortDir)) ? Arrays.asList(ascOrder) : Arrays.asList(descOrder);
    List<String> selectOrderBool =
        ("asc".equals(sortDir)) ? Arrays.asList(ascOrderBool) : Arrays.asList(descOrderBool);
    SolrParams exportParams =
        params(
            "q",
            "*:*",
            "qt",
            "/export",
            "fl",
            "id," + field,
            "sort",
            field + " " + sortDir + ",id asc");
    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try (CloudSolrStream solrStream =
        new CloudSolrStream(zkHost, COLLECTIONORALIAS, exportParams)) {
      solrStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(solrStream);
      assertEquals("There should be exactly 32 responses returned", 32, tuples.size());
      // Since the getTuples method doesn't return the EOF tuple, these two entries should be the
      // same size.
      assertEquals(
          "Tuple count should exactly match sort array size for field "
              + field
              + " sort order "
              + sortDir,
          selectOrder.size(),
          tuples.size());

      // Tuples should be in lock step with the orders passed in.
      for (int idx = 0; idx < selectOrder.size(); ++idx) {
        assertEquals(
            "Order for missing docValues fields wrong for field '"
                + field
                + "' sort direction '"
                + sortDir
                + "' RESTORE GETTING selectOrder from select statement after LUCENE-7548",
            tuples.get(idx).getString("id"),
            (field.startsWith("b_") ? selectOrderBool.get(idx) : selectOrder.get(idx)));
      }
    } finally {
      solrClientCache.close();
    }
  }

  static final String[] voidIds =
      new String[] {
        "iii1", "eee1", "aaa1", "ooo1", "iii2", "eee2", "aaa2", "ooo2", "iii3", "eee3", "aaa3",
        "ooo3"
      };

  private void checkReturnValsForEmpty(String[] fields) throws IOException {

    Set<String> voids = new HashSet<>(Arrays.asList(voidIds));

    StringBuilder fl = new StringBuilder("id");
    for (String f : fields) {
      fl.append(",").append(f);
    }
    SolrParams sParams = params("q", "*:*", "qt", "/export", "fl", fl.toString(), "sort", "id asc");

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try (CloudSolrStream solrStream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
      solrStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(solrStream);
      assertEquals("There should be exactly 32 responses returned", 32, tuples.size());

      for (Tuple tuple : tuples) {
        String id = tuple.getString("id");
        if (voids.contains(id)) {
          for (String f : fields) {
            assertNull("Should have returned a void for field " + f + " doc " + id, tuple.get(f));
          }
        } else {
          for (String f : fields) {
            assertNotNull(
                "Should have returned a value for field " + f + " doc " + id, tuple.get(f));
          }
        }
      }
    } finally {
      solrClientCache.close();
    }
  }

  // Goes away after LUCENE-7548
  static final String[] ascOrder =
      new String[] {
        "aaa1", "aaa2", "aaa3", "eee1",
        "eee2", "eee3", "iii1", "iii2",
        "iii3", "ooo1", "ooo2", "ooo3",
        "aaa4", "eee4", "iii4", "ooo4",
        "aaa5", "eee5", "iii5", "ooo5",
        "aaa6", "eee6", "iii6", "ooo6",
        "aaa7", "eee7", "iii7", "ooo7",
        "aaa8", "eee8", "iii8", "ooo8"
      };

  // Goes away after LUCENE-7548
  static final String[] descOrder =
      new String[] {
        "aaa8", "eee8", "iii8", "ooo8",
        "aaa7", "eee7", "iii7", "ooo7",
        "aaa6", "eee6", "iii6", "ooo6",
        "aaa5", "eee5", "iii5", "ooo5",
        "aaa4", "eee4", "iii4", "ooo4",
        "aaa1", "aaa2", "aaa3", "eee1",
        "eee2", "eee3", "iii1", "iii2",
        "iii3", "ooo1", "ooo2", "ooo3"
      };

  // Goes away after LUCENE-7548
  static final String[] ascOrderBool =
      new String[] {
        "aaa1", "aaa2", "aaa3", "eee1",
        "eee2", "eee3", "iii1", "iii2",
        "iii3", "ooo1", "ooo2", "ooo3",
        "aaa5", "aaa7", "eee5", "eee7",
        "iii5", "iii7", "ooo5", "ooo7",
        "aaa4", "aaa6", "aaa8", "eee4",
        "eee6", "eee8", "iii4", "iii6",
        "iii8", "ooo4", "ooo6", "ooo8"
      };

  // Goes away after LUCENE-7548
  static final String[] descOrderBool =
      new String[] {
        "aaa4", "aaa6", "aaa8", "eee4",
        "eee6", "eee8", "iii4", "iii6",
        "iii8", "ooo4", "ooo6", "ooo8",
        "aaa5", "aaa7", "eee5", "eee7",
        "iii5", "iii7", "ooo5", "ooo7",
        "aaa1", "aaa2", "aaa3", "eee1",
        "eee2", "eee3", "iii1", "iii2",
        "iii3", "ooo1", "ooo2", "ooo3",
      };

  @Test
  public void testMissingFields() throws Exception {

    new UpdateRequest()
        // Some docs with nothing at all for any of the "interesting" fields.
        .add(id, "iii1")
        .add(id, "eee1")
        .add(id, "aaa1")
        .add(id, "ooo1")
        .add(id, "iii2")
        .add(id, "eee2")
        .add(id, "aaa2")
        .add(id, "ooo2")
        .add(id, "iii3")
        .add(id, "eee3")
        .add(id, "aaa3")
        .add(id, "ooo3")

        // Docs with values for all the types we want to sort on.

        .add(docPairs(4, "iii"))
        .add(docPairs(4, "eee"))
        .add(docPairs(4, "aaa"))
        .add(docPairs(4, "ooo"))
        .add(docPairs(5, "iii"))
        .add(docPairs(5, "eee"))
        .add(docPairs(5, "aaa"))
        .add(docPairs(5, "ooo"))
        .add(docPairs(6, "iii"))
        .add(docPairs(6, "eee"))
        .add(docPairs(6, "aaa"))
        .add(docPairs(6, "ooo"))
        .add(docPairs(7, "iii"))
        .add(docPairs(7, "eee"))
        .add(docPairs(7, "aaa"))
        .add(docPairs(7, "ooo"))
        .add(docPairs(8, "iii"))
        .add(docPairs(8, "eee"))
        .add(docPairs(8, "aaa"))
        .add(docPairs(8, "ooo"))
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);

    String[] fields =
        new String[] {"s_sing", "i_sing", "f_sing", "l_sing", "d_sing", "dt_sing", "b_sing"};

    for (String f : fields) {
      checkSort(jetty, f, "asc", fields);
      checkSort(jetty, f, "desc", fields);
    }

    checkReturnValsForEmpty(fields);
  }

  @Test
  public void testSubFacetStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "level1_s", "hello0", "level2_s", "a", "a_i", "0", "a_f", "1")
        .add(id, "2", "level1_s", "hello0", "level2_s", "a", "a_i", "2", "a_f", "2")
        .add(id, "3", "level1_s", "hello3", "level2_s", "a", "a_i", "3", "a_f", "3")
        .add(id, "4", "level1_s", "hello4", "level2_s", "a", "a_i", "4", "a_f", "4")
        .add(id, "1", "level1_s", "hello0", "level2_s", "b", "a_i", "1", "a_f", "5")
        .add(id, "5", "level1_s", "hello3", "level2_s", "b", "a_i", "10", "a_f", "6")
        .add(id, "6", "level1_s", "hello4", "level2_s", "b", "a_i", "11", "a_f", "7")
        .add(id, "7", "level1_s", "hello3", "level2_s", "b", "a_i", "12", "a_f", "8")
        .add(id, "8", "level1_s", "hello3", "level2_s", "b", "a_i", "13", "a_f", "9")
        .add(id, "9", "level1_s", "hello0", "level2_s", "b", "a_i", "14", "a_f", "10")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {

      SolrParams sParamsA = params("q", "*:*", "fl", "a_i,a_f");

      Bucket[] buckets = {new Bucket("level1_s"), new Bucket("level2_s")};

      Metric[] metrics = {new SumMetric("a_i"), new CountMetric()};

      FieldComparator[] sorts = {
        new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING),
        new FieldComparator("sum(a_i)", ComparatorOrder.DESCENDING)
      };

      FacetStream facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
      facetStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(facetStream);
      assertEquals(6, tuples.size());

      Tuple tuple = tuples.get(0);
      String bucket1 = tuple.getString("level1_s");
      String bucket2 = tuple.getString("level2_s");
      Double sumi = tuple.getDouble("sum(a_i)");
      Double count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket1);
      assertEquals("b", bucket2);
      assertEquals(35, sumi.longValue());
      assertEquals(3, count, 0.1);

      tuple = tuples.get(1);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket1);
      assertEquals("b", bucket2);
      assertEquals(15, sumi.longValue());
      assertEquals(2, count, 0.1);

      tuple = tuples.get(2);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket1);
      assertEquals("b", bucket2);
      assertEquals(11, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(3);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket1);
      assertEquals("a", bucket2);
      assertEquals(4, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(4);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket1);
      assertEquals("a", bucket2);
      assertEquals(3, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(5);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket1);
      assertEquals("a", bucket2);
      assertEquals(2, sumi.longValue());
      assertEquals(2, count, 0.1);

      sorts[0] = new FieldComparator("level1_s", ComparatorOrder.DESCENDING);
      sorts[1] = new FieldComparator("level2_s", ComparatorOrder.DESCENDING);
      facetStream =
          new FacetStream(zkHost, COLLECTIONORALIAS, sParamsA, buckets, metrics, sorts, 100);
      facetStream.setStreamContext(streamContext);
      tuples = getTuples(facetStream);
      assertEquals(6, tuples.size());

      tuple = tuples.get(0);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket1);
      assertEquals("b", bucket2);
      assertEquals(11, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(1);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket1);
      assertEquals("a", bucket2);
      assertEquals(4, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(2);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket1);
      assertEquals("b", bucket2);
      assertEquals(35, sumi.longValue());
      assertEquals(3, count, 0.1);

      tuple = tuples.get(3);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket1);
      assertEquals("a", bucket2);
      assertEquals(3, sumi.longValue());
      assertEquals(1, count, 0.1);

      tuple = tuples.get(4);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket1);
      assertEquals("b", bucket2);
      assertEquals(15, sumi.longValue());
      assertEquals(2, count, 0.1);

      tuple = tuples.get(5);
      bucket1 = tuple.getString("level1_s");
      bucket2 = tuple.getString("level2_s");
      sumi = tuple.getDouble("sum(a_i)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket1);
      assertEquals("a", bucket2);
      assertEquals(2, sumi.longValue());
      assertEquals(2, count, 0.1);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testRollupStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      Bucket[] buckets = {new Bucket("a_s")};

      Metric[] metrics = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
      rollupStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(rollupStream);

      assertEquals(3, tuples.size());

      // Test Long and Double Sums

      Tuple tuple = tuples.get(0);
      String bucket = tuple.getString("a_s");
      Double sumi = tuple.getDouble("sum(a_i)");
      Double sumf = tuple.getDouble("sum(a_f)");
      Double mini = tuple.getDouble("min(a_i)");
      Double minf = tuple.getDouble("min(a_f)");
      Double maxi = tuple.getDouble("max(a_i)");
      Double maxf = tuple.getDouble("max(a_f)");
      Double avgi = tuple.getDouble("avg(a_i)");
      Double avgf = tuple.getDouble("avg(a_f)");
      Double count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, 0.001);
      assertEquals(18, sumf, 0.001);
      assertEquals(0, mini, 0.001);
      assertEquals(1, minf, 0.001);
      assertEquals(14, maxi, 0.001);
      assertEquals(10, maxf, 0.001);
      assertEquals(4.25, avgi, 0.001);
      assertEquals(4.5, avgf, 0.001);
      assertEquals(4, count, 0.001);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38, sumi, 0.001);
      assertEquals(26, sumf, 0.001);
      assertEquals(3, mini, 0.001);
      assertEquals(3, minf, 0.001);
      assertEquals(13, maxi, 0.001);
      assertEquals(9, maxf, 0.001);
      assertEquals(9.5, avgi, 0.001);
      assertEquals(6.5, avgf, 0.001);
      assertEquals(4, count, 0.001);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11, sumf, 0.01);
      assertEquals(4, mini, 0.01);
      assertEquals(4, minf, 0.01);
      assertEquals(11, maxi, 0.01);
      assertEquals(7, maxf, 0.01);
      assertEquals(7.5, avgi, 0.01);
      assertEquals(5.5, avgf, 0.01);
      assertEquals(2, count, 0.01);

      // Test will null metrics
      rollupStream = new RollupStream(stream, buckets, metrics);
      rollupStream.setStreamContext(streamContext);
      tuples = getTuples(rollupStream);

      assertEquals(3, tuples.size());
      tuple = tuples.get(0);
      bucket = tuple.getString("a_s");
      assertEquals("hello0", bucket);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      assertEquals("hello3", bucket);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      assertEquals("hello4", bucket);

      // Test will null value in the grouping field
      new UpdateRequest()
          .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10")
          .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

      sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      Bucket[] buckets1 = {new Bucket("a_s")};

      Metric[] metrics1 = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      rollupStream = new RollupStream(stream, buckets1, metrics1);
      rollupStream.setStreamContext(streamContext);
      tuples = getTuples(rollupStream);
      // Check that we've got the extra NULL bucket
      assertEquals(4, tuples.size());
      tuple = tuples.get(0);
      assertEquals("NULL", tuple.getString("a_s"));

      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals(14, sumi, 0.01);
      assertEquals(10, sumf, 0.01);
      assertEquals(14, mini, 0.01);
      assertEquals(10, minf, 0.01);
      assertEquals(14, maxi, 0.01);
      assertEquals(10, maxf, 0.01);
      assertEquals(14, avgi, 0.01);
      assertEquals(10, avgf, 0.01);
      assertEquals(1, count, 0.01);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testDaemonTopicStream() throws Exception {
    Assume.assumeTrue(!useAlias);

    StreamContext context = new StreamContext();
    SolrClientCache cache = new SolrClientCache();
    context.setSolrClientCache(cache);

    try {
      SolrParams sParams = params("q", "a_s:hello0", "rows", "500", "fl", "id");

      TopicStream topicStream =
          new TopicStream(
              zkHost, COLLECTIONORALIAS, COLLECTIONORALIAS, "50000000", -1, 1000000, sParams);

      DaemonStream daemonStream = new DaemonStream(topicStream, "daemon1", 1000, 500);
      daemonStream.setStreamContext(context);

      daemonStream.open();
      CheckDaemonStream(context, daemonStream);

      // We should get an error if we try to open an already-open stream.
      final IOException ex =
          expectThrows(
              IOException.class,
              () -> {
                daemonStream.open();
              });
      assertEquals(
          "Should have an intelligible exception message",
          ex.getMessage(),
          "There is already an open daemon named 'daemon1', no action taken.");
      daemonStream.close();

      // We should be able to close then re-open the stream, then close it again, see SOLR-13408
      daemonStream.open();
      CheckDaemonStream(context, daemonStream);
      daemonStream.close();
    } finally {
      cache.close();
    }
  }

  private void CheckDaemonStream(StreamContext context, DaemonStream daemonStream)
      throws IOException, SolrServerException {
    // Wait for the checkpoint
    JettySolrRunner jetty = cluster.getJettySolrRunners().get(0);

    SolrParams sParams1 = params("qt", "/get", "ids", "50000000", "fl", "id");
    int count = 0;
    while (count == 0) {
      SolrStream solrStream =
          new SolrStream(jetty.getBaseUrl().toString() + "/" + COLLECTIONORALIAS, sParams1);
      solrStream.setStreamContext(context);
      List<Tuple> tuples = getTuples(solrStream);
      count = tuples.size();
      if (count > 0) {
        Tuple t = tuples.get(0);
        assertEquals(50000000, t.getLong("id").longValue());
      } else {
        System.out.println("###### Waiting for checkpoint #######:" + count);
      }
    }

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1")
        .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2")
        .add(id, "3", "a_s", "hello0", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello0", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    for (int i = 0; i < 5; i++) {
      daemonStream.read();
    }

    new UpdateRequest()
        .add(id, "5", "a_s", "hello0", "a_i", "4", "a_f", "4")
        .add(id, "6", "a_s", "hello0", "a_i", "4", "a_f", "4")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    for (int i = 0; i < 2; i++) {
      daemonStream.read();
    }

    daemonStream.shutdown();

    Tuple tuple = daemonStream.read();

    assertTrue(tuple.EOF);
  }

  @Test
  public void testRollupWithNoParallel() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Intentionally adding partitionKeys to trigger SOLR-12674
      SolrParams sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "a_s,a_i,a_f",
              "sort",
              "a_s asc",
              "partitionKeys",
              "a_s",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      Bucket[] buckets = {new Bucket("a_s")};

      Metric[] metrics = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
      rollupStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(rollupStream);
      assertEquals(3, tuples.size());

      List<String> shardUrls =
          TupleStream.getShards(
              cluster.getZkServer().getZkAddress(), COLLECTIONORALIAS, streamContext);
      ModifiableSolrParams solrParams = new ModifiableSolrParams();
      solrParams.add("qt", "/stream");
      solrParams.add(
          "expr",
          "rollup(search("
              + COLLECTIONORALIAS
              + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\")\n");
      SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams);
      streamContext = new StreamContext();
      solrStream.setStreamContext(streamContext);
      tuples = getTuples(solrStream);
      assertEquals(3, tuples.size());
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelRollupStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParamsA =
          params(
              "q",
              "*:*",
              "fl",
              "a_s,a_i,a_f",
              "sort",
              "a_s asc",
              "partitionKeys",
              "a_s",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      Bucket[] buckets = {new Bucket("a_s")};

      Metric[] metrics = {
        new SumMetric("a_i"),
        new SumMetric("a_f"),
        new MinMetric("a_i"),
        new MinMetric("a_f"),
        new MaxMetric("a_i"),
        new MaxMetric("a_f"),
        new MeanMetric("a_i"),
        new MeanMetric("a_f"),
        new CountMetric()
      };

      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
      ParallelStream parallelStream =
          parallelStream(rollupStream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
      attachStreamFactory(parallelStream);
      parallelStream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(parallelStream);

      assertEquals(3, tuples.size());

      // Test Long and Double Sums

      Tuple tuple = tuples.get(0);
      String bucket = tuple.getString("a_s");
      Double sumi = tuple.getDouble("sum(a_i)");
      Double sumf = tuple.getDouble("sum(a_f)");
      Double mini = tuple.getDouble("min(a_i)");
      Double minf = tuple.getDouble("min(a_f)");
      Double maxi = tuple.getDouble("max(a_i)");
      Double maxf = tuple.getDouble("max(a_f)");
      Double avgi = tuple.getDouble("avg(a_i)");
      Double avgf = tuple.getDouble("avg(a_f)");
      Double count = tuple.getDouble("count(*)");

      assertEquals("hello0", bucket);
      assertEquals(17, sumi, 0.001);
      assertEquals(18, sumf, 0.001);
      assertEquals(0, mini, 0.001);
      assertEquals(1, minf, 0.001);
      assertEquals(14, maxi, 0.001);
      assertEquals(10, maxf, 0.001);
      assertEquals(4.25, avgi, 0.001);
      assertEquals(4.5, avgf, 0.001);
      assertEquals(4, count, 0.001);

      tuple = tuples.get(1);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello3", bucket);
      assertEquals(38, sumi, 0.001);
      assertEquals(26, sumf, 0.001);
      assertEquals(3, mini, 0.001);
      assertEquals(3, minf, 0.001);
      assertEquals(13, maxi, 0.001);
      assertEquals(9, maxf, 0.001);
      assertEquals(9.5, avgi, 0.001);
      assertEquals(6.5, avgf, 0.001);
      assertEquals(4, count, 0.001);

      tuple = tuples.get(2);
      bucket = tuple.getString("a_s");
      sumi = tuple.getDouble("sum(a_i)");
      sumf = tuple.getDouble("sum(a_f)");
      mini = tuple.getDouble("min(a_i)");
      minf = tuple.getDouble("min(a_f)");
      maxi = tuple.getDouble("max(a_i)");
      maxf = tuple.getDouble("max(a_f)");
      avgi = tuple.getDouble("avg(a_i)");
      avgf = tuple.getDouble("avg(a_f)");
      count = tuple.getDouble("count(*)");

      assertEquals("hello4", bucket);
      assertEquals(15, sumi.longValue());
      assertEquals(11, sumf, 0.001);
      assertEquals(4, mini, 0.001);
      assertEquals(4, minf, 0.001);
      assertEquals(11, maxi, 0.001);
      assertEquals(7, maxf, 0.001);
      assertEquals(7.5, avgi, 0.001);
      assertEquals(5.5, avgf, 0.001);
      assertEquals(2, count, 0.001);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testZeroParallelReducerStream() throws Exception {

    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    try {
      SolrParams sParamsA =
          params(
              "q",
              "a_s:blah",
              "fl",
              "id,a_s,a_i,a_f",
              "sort",
              "a_s asc,a_f asc",
              "partitionKeys",
              "a_s",
              "qt",
              "/export");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);
      ReducerStream rstream =
          new ReducerStream(
              stream,
              new FieldEqualitor("a_s"),
              new GroupOperation(new FieldComparator("a_s", ComparatorOrder.ASCENDING), 2));
      ParallelStream pstream =
          parallelStream(rstream, new FieldComparator("a_s", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);
      assertEquals(0, tuples.size());
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testTuple() throws Exception {

    new UpdateRequest()
        .add(
            id, "0", "a_s", "hello0", "a_i", "0", "a_f", "5.1", "s_multi", "a", "s_multi", "b",
            "i_multi", "1", "i_multi", "2", "f_multi", "1.2", "f_multi", "1.3")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      SolrParams sParams =
          params("q", "*:*", "fl", "id,a_s,a_i,a_f,s_multi,i_multi,f_multi", "sort", "a_s asc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);
      Tuple tuple = tuples.get(0);

      String s = tuple.getString("a_s");
      assertEquals("hello0", s);

      long l = tuple.getLong("a_i");
      assertEquals(0, l);

      double d = tuple.getDouble("a_f");
      assertEquals(5.1, d, 0.001);

      List<String> stringList = tuple.getStrings("s_multi");
      assertEquals("a", stringList.get(0));
      assertEquals("b", stringList.get(1));

      List<Long> longList = tuple.getLongs("i_multi");
      assertEquals(1, longList.get(0).longValue());
      assertEquals(2, longList.get(1).longValue());

      List<Double> doubleList = tuple.getDoubles("f_multi");
      assertEquals(1.2, doubleList.get(0), 0.001);
      assertEquals(1.3, doubleList.get(1), 0.001);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testMergeStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test ascending
      SolrParams sParamsA = params("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i asc");
      CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      SolrParams sParamsB = params("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i asc");
      CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      MergeStream mstream =
          new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
      mstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(mstream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 1, 2, 3, 4);

      // Test descending
      sParamsA = params("q", "id:(4 1)", "fl", "id,a_s,a_i", "sort", "a_i desc");
      streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      sParamsB = params("q", "id:(0 2 3)", "fl", "id,a_s,a_i", "sort", "a_i desc");
      streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      mstream =
          new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      mstream.setStreamContext(streamContext);
      tuples = getTuples(mstream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 4, 3, 2, 1, 0);

      // Test compound sort

      sParamsA = params("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
      streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      sParamsB = params("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
      streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      mstream =
          new MergeStream(
              streamA,
              streamB,
              new MultipleFieldComparator(
                  new FieldComparator("a_f", ComparatorOrder.ASCENDING),
                  new FieldComparator("a_i", ComparatorOrder.ASCENDING)));
      mstream.setStreamContext(streamContext);
      tuples = getTuples(mstream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 2, 1, 3, 4);

      sParamsA = params("q", "id:(2 4 1)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
      streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      sParamsB = params("q", "id:(0 3)", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
      streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      mstream =
          new MergeStream(
              streamA,
              streamB,
              new MultipleFieldComparator(
                  new FieldComparator("a_f", ComparatorOrder.ASCENDING),
                  new FieldComparator("a_i", ComparatorOrder.DESCENDING)));
      mstream.setStreamContext(streamContext);
      tuples = getTuples(mstream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 2, 0, 1, 3, 4);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelMergeStream() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .add(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0")
        .add(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0")
        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
        .add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
        .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test ascending
      SolrParams sParamsA =
          params(
              "q",
              "id:(4 1 8 7 9)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i asc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      SolrParams sParamsB =
          params(
              "q",
              "id:(0 2 3 6)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i asc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      MergeStream mstream =
          new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
      ParallelStream pstream =
          parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);

      assertEquals(9, tuples.size());
      assertOrder(tuples, 0, 1, 2, 3, 4, 7, 6, 8, 9);

      // Test descending
      sParamsA =
          params(
              "q",
              "id:(4 1 8 9)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i desc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      sParamsB =
          params(
              "q",
              "id:(0 2 3 6)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i desc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      mstream =
          new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      pstream = parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.DESCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      tuples = getTuples(pstream);

      assertEquals(8, tuples.size());
      assertOrder(tuples, 9, 8, 6, 4, 3, 2, 1, 0);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testParallelEOF() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .add(id, "5", "a_s", "hello0", "a_i", "10", "a_f", "0")
        .add(id, "6", "a_s", "hello2", "a_i", "8", "a_f", "0")
        .add(id, "7", "a_s", "hello3", "a_i", "7", "a_f", "3")
        .add(id, "8", "a_s", "hello4", "a_i", "11", "a_f", "4")
        .add(id, "9", "a_s", "hello1", "a_i", "100", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);

    try {
      // Test ascending
      SolrParams sParamsA =
          params(
              "q",
              "id:(4 1 8 7 9)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i asc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      CloudSolrStream streamA = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA);

      SolrParams sParamsB =
          params(
              "q",
              "id:(0 2 3 6)",
              "fl",
              "id,a_s,a_i",
              "sort",
              "a_i asc",
              "partitionKeys",
              "a_i",
              "qt",
              "/export");
      CloudSolrStream streamB = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsB);

      MergeStream mstream =
          new MergeStream(streamA, streamB, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
      ParallelStream pstream =
          parallelStream(mstream, new FieldComparator("a_i", ComparatorOrder.ASCENDING));
      attachStreamFactory(pstream);
      pstream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(pstream);

      assertEquals(9, tuples.size());
      Map<String, Tuple> eofTuples = pstream.getEofTuples();
      assertEquals(numWorkers, eofTuples.size()); // There should be an EOF Tuple for each worker.
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void streamTests() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    // Basic CloudSolrStream Test with Descending Sort

    try {
      SolrParams sParams = params("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 4, 3, 2, 1, 0);

      // With Ascending Sort
      sParams = params("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 1, 2, 3, 4);

      // Test compound sort
      sParams = params("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 2, 0, 1, 3, 4);

      sParams = params("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 2, 1, 3, 4);
    } finally {
      solrClientCache.close();
    }
  }

  /**
   * This test verifies that setting a core into the stream context entries and streamContext.local
   * = true causes the streaming expression to only consider data found on the local node.
   */
  @Test
  public void streamLocalTests() throws Exception {

    new UpdateRequest()
        .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "0")
        .add(id, "2", "a_s", "hello2", "a_i", "2", "a_f", "0")
        .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3")
        .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4")
        .add(id, "1", "a_s", "hello1", "a_i", "1", "a_f", "1")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    StreamContext streamContext = new StreamContext();
    streamContext.setLocal(true);
    ZkStateReader zkStateReader = cluster.getZkStateReader();
    List<String> strings =
        zkStateReader.aliasesManager.getAliases().resolveAliases(COLLECTIONORALIAS);
    String collName = strings.size() > 0 ? strings.get(0) : COLLECTIONORALIAS;
    zkStateReader.forceUpdateCollection(collName);
    DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(collName);
    List<Replica> replicas = collection.getReplicas();
    streamContext
        .getEntries()
        .put("core", replicas.get(random().nextInt(replicas.size())).getCoreName());
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    // Basic CloudSolrStream Test with Descending Sort

    try {
      SolrParams sParams = params("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i desc");
      CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);

      // note if hashing algo changes this might break
      switch (tuples.size()) {
        case 5: // 1 shard
          assertOrder(tuples, 4, 3, 2, 1, 0);
          break;
        case 3: // 2 shards case 1 (randomized)
          assertOrder(tuples, 4, 1, 0);
          break;
        case 2: // 2 shards case 2 (randomized)
          assertOrder(tuples, 3, 2);
          break;
        default: // nope, no way, no how, never good.
          fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
      }

      // With Ascending Sort
      sParams = params("q", "*:*", "fl", "id,a_s,a_i", "sort", "a_i asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      // note if hashing algo changes this might break
      switch (tuples.size()) {
        case 5: // 1 shard
          assertOrder(tuples, 0, 1, 2, 3, 4);
          break;
        case 3: // 2 shards case 1 (randomized)
          assertOrder(tuples, 0, 1, 4);
          break;
        case 2: // 2 shards case 2 (randomized)
          assertOrder(tuples, 2, 3);
          break;
        default: // nope, no way, no how, never good.
          fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
      }

      // Test compound sort
      sParams = params("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i desc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      // note if hashing algo changes this might break
      switch (tuples.size()) {
        case 5: // 1 shard
          assertOrder(tuples, 2, 0, 1, 3, 4);
          break;
        case 3: // 2 shards case 1 (randomized)
          assertOrder(tuples, 0, 1, 4);
          break;
        case 2: // 2 shards case 2 (randomized)
          assertOrder(tuples, 2, 3);
          break;
        default: // nope, no way, no how, never good.
          fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
      }

      sParams = params("q", "*:*", "fl", "id,a_s,a_i,a_f", "sort", "a_f asc,a_i asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      // note if hashing algo changes this might break
      switch (tuples.size()) {
        case 5: // 1 shard
          assertOrder(tuples, 0, 2, 1, 3, 4);
          break;
        case 3: // 2 shards case 1 (randomized)
          assertOrder(tuples, 0, 1, 4);
          break;
        case 2: // 2 shards case 2 (randomized)
          assertOrder(tuples, 2, 3);
          break;
        default: // nope, no way, no how, never good.
          fail("should have 3, 5 or 2 tuples, has hashing algorithm changed?");
      }

    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testDateBoolSorting() throws Exception {

    new UpdateRequest()
        .add(id, "0", "b_sing", "false", "dt_sing", "1981-03-04T01:02:03.78Z")
        .add(id, "3", "b_sing", "true", "dt_sing", "1980-03-04T01:02:03.78Z")
        .add(id, "2", "b_sing", "false", "dt_sing", "1981-04-04T01:02:03.78Z")
        .add(id, "1", "b_sing", "true", "dt_sing", "1980-04-04T01:02:03.78Z")
        .add(id, "4", "b_sing", "true", "dt_sing", "1980-04-04T01:02:03.78Z")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    trySortWithQt("/export");
    trySortWithQt("/select");
  }

  private void trySortWithQt(String which) throws Exception {
    // Basic CloudSolrStream Test bools desc

    SolrParams sParams =
        params("q", "*:*", "qt", which, "fl", "id,b_sing", "sort", "b_sing asc,id asc");
    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);

    try {
      stream.setStreamContext(streamContext);
      List<Tuple> tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 0, 2, 1, 3, 4);

      // Basic CloudSolrStream Test bools desc
      sParams = params("q", "*:*", "qt", which, "fl", "id,b_sing", "sort", "b_sing desc,id desc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 4, 3, 1, 2, 0);

      // Basic CloudSolrStream Test dates desc
      sParams = params("q", "*:*", "qt", which, "fl", "id,dt_sing", "sort", "dt_sing desc,id asc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 2, 0, 1, 4, 3);

      // Basic CloudSolrStream Test ates desc
      sParams = params("q", "*:*", "qt", which, "fl", "id,dt_sing", "sort", "dt_sing asc,id desc");
      stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams);
      stream.setStreamContext(streamContext);
      tuples = getTuples(stream);

      assertEquals(5, tuples.size());
      assertOrder(tuples, 3, 4, 1, 0, 2);
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testAllValidExportTypes() throws Exception {

    // Test whether all the expected types are actually returned, including booleans and dates.
    // The contract is that the /select and /export handlers return the same format, so we can test
    // this once each way
    new UpdateRequest()
        .add(
            id,
            "0",
            "i_sing",
            "11",
            "i_multi",
            "12",
            "i_multi",
            "13",
            "l_sing",
            "14",
            "l_multi",
            "15",
            "l_multi",
            "16",
            "f_sing",
            "1.70",
            "f_multi",
            "1.80",
            "f_multi",
            "1.90",
            "d_sing",
            "1.20",
            "d_multi",
            "1.21",
            "d_multi",
            "1.22",
            "s_sing",
            "single",
            "s_multi",
            "sm1",
            "s_multi",
            "sm2",
            "dt_sing",
            "1980-01-02T11:11:33.89Z",
            "dt_multi",
            "1981-03-04T01:02:03.78Z",
            "dt_multi",
            "1981-05-24T04:05:06.99Z",
            "b_sing",
            "true",
            "b_multi",
            "false",
            "b_multi",
            "true")
        .commit(cluster.getSolrClient(), COLLECTIONORALIAS);

    tryWithQt("/export");
    tryWithQt("/select");
  }

  // We should be getting the exact same thing back with both the export and select handlers, so
  // test
  private void tryWithQt(String which) throws IOException {
    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    streamContext.setSolrClientCache(solrClientCache);
    SolrParams sParams =
        params(
            "q",
            "*:*",
            "qt",
            which,
            "fl",
            "id,i_sing,i_multi,l_sing,l_multi,f_sing,f_multi,d_sing,d_multi,dt_sing,dt_multi,s_sing,s_multi,b_sing,b_multi",
            "sort",
            "i_sing asc");
    try (CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {

      stream.setStreamContext(streamContext);
      Tuple tuple = getTuple(stream);

      assertEquals("Integers should be returned", 11, tuple.getLong("i_sing").longValue());
      assertEquals(
          "MV should be returned for i_multi", 12, tuple.getLongs("i_multi").get(0).longValue());
      assertEquals(
          "MV should be returned for i_multi", 13, tuple.getLongs("i_multi").get(1).longValue());

      assertEquals("longs should be returned", 14, tuple.getLong("l_sing").longValue());
      assertEquals(
          "MV should be returned for l_multi", 15, tuple.getLongs("l_multi").get(0).longValue());
      assertEquals(
          "MV should be returned for l_multi", 16, tuple.getLongs("l_multi").get(1).longValue());

      assertEquals("floats should be returned", 1.7, tuple.getDouble("f_sing"), 0.001);
      assertEquals(
          "MV should be returned for f_multi", 1.8, tuple.getDoubles("f_multi").get(0), 0.001);
      assertEquals(
          "MV should be returned for f_multi", 1.9, tuple.getDoubles("f_multi").get(1), 0.001);

      assertEquals("doubles should be returned", 1.2, tuple.getDouble("d_sing"), 0.001);
      assertEquals(
          "MV should be returned for d_multi", 1.21, tuple.getDoubles("d_multi").get(0), 0.001);
      assertEquals(
          "MV should be returned for d_multi", 1.22, tuple.getDoubles("d_multi").get(1), 0.001);

      assertEquals("Strings should be returned", "single", tuple.getString("s_sing"));
      assertEquals("MV should be returned for s_multi", "sm1", tuple.getStrings("s_multi").get(0));
      assertEquals("MV should be returned for s_multi", "sm2", tuple.getStrings("s_multi").get(1));

      assertEquals(
          "Dates should be returned as Strings",
          "1980-01-02T11:11:33.890Z",
          tuple.getString("dt_sing"));
      assertEquals(
          "MV dates should be returned as Strings for dt_multi",
          "1981-03-04T01:02:03.780Z",
          tuple.getStrings("dt_multi").get(0));
      assertEquals(
          "MV dates should be returned as Strings for dt_multi",
          "1981-05-24T04:05:06.990Z",
          tuple.getStrings("dt_multi").get(1));

      // Also test native type conversion
      Date dt = new Date(Instant.parse("1980-01-02T11:11:33.890Z").toEpochMilli());
      assertEquals(
          "Dates should be returned as Dates",
          tuple.getDate("dt_sing").toInstant(),
          dt.toInstant());
      dt = new Date(Instant.parse("1981-03-04T01:02:03.780Z").toEpochMilli());
      assertEquals(
          "MV dates should be returned as Dates for dt_multi",
          tuple.getDates("dt_multi").get(0).toInstant(),
          dt.toInstant());
      dt = new Date(Instant.parse("1981-05-24T04:05:06.990Z").toEpochMilli());
      assertEquals(
          "MV dates should be returned as Dates  for dt_multi",
          tuple.getDates("dt_multi").get(1).toInstant(),
          dt.toInstant());

      assertTrue("Booleans should be returned", tuple.getBool("b_sing"));
      assertFalse("MV boolean should be returned for b_multi", tuple.getBools("b_multi").get(0));
      assertTrue("MV boolean should be returned for b_multi", tuple.getBools("b_multi").get(1));
    } finally {
      solrClientCache.close();
    }
  }

  @Test
  public void testTupleStreamGetShardsPreference() throws Exception {
    StreamContext streamContext = new StreamContext();
    streamContext.setSolrClientCache(new SolrClientCache());
    streamContext.setRequestReplicaListTransformerGenerator(
        new RequestReplicaListTransformerGenerator(
            ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null, null));

    streamContext.setRequestParams(
        params(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt"));

    try {
      ZkStateReader zkStateReader = cluster.getZkStateReader();
      List<String> strings =
          zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
      String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS;
      Map<String, String> replicaTypeMap =
          mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName));

      // Test from extra params
      SolrParams sParams =
          params(
              "q",
              "*:*",
              ShardParams.SHARDS_PREFERENCE,
              ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull");
      testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap);

      // Test defaults from streamContext.getParams()
      testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap);

      // Test defaults from the RLTG
      streamContext.setRequestParams(new ModifiableSolrParams());
      testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap);
    } finally {
      streamContext.getSolrClientCache().close();
    }
  }

  public void testTupleStreamSorting(
      StreamContext streamContext,
      SolrParams solrParams,
      String replicaType,
      Map<String, String> replicaTypeMap)
      throws Exception {
    List<String> shards =
        TupleStream.getShards(
            cluster.getZkClient().getZkServerAddress(),
            MULTI_REPLICA_COLLECTIONORALIAS,
            streamContext,
            solrParams);
    for (String shard : shards) {
      assertEquals(
          shard,
          replicaType.toUpperCase(Locale.ROOT),
          replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT));
    }
  }

  protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
    tupleStream.open();
    List<Tuple> tuples = new ArrayList<>();
    for (; ; ) {
      Tuple t = tupleStream.read();
      if (t.EOF) {
        break;
      } else {
        tuples.add(t);
      }
    }
    tupleStream.close();
    return tuples;
  }

  protected Tuple getTuple(TupleStream tupleStream) throws IOException {
    tupleStream.open();
    Tuple t = tupleStream.read();
    tupleStream.close();
    return t;
  }

  protected boolean assertOrder(List<Tuple> tuples, int... ids) {
    int i = 0;
    for (int val : ids) {
      Tuple t = tuples.get(i);
      String tip = (String) t.get("id");
      String valStr = Integer.toString(val);
      assertEquals(valStr, tip);
      ++i;
    }
    return true;
  }

  protected boolean assertGroupOrder(Tuple tuple, int... ids) {
    @SuppressWarnings({"rawtypes"})
    List group = (List) tuple.get("tuples");
    int i = 0;
    for (int val : ids) {
      @SuppressWarnings({"rawtypes"})
      Map t = (Map) group.get(i);
      int tip = (int) t.get("id");
      assertEquals(val, tip);
      ++i;
    }
    return true;
  }

  public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception {
    long lv = (long) tuple.get(fieldName);
    if (lv != l) {
      throw new Exception("Longs not equal:" + l + " : " + lv);
    }

    return true;
  }

  private void attachStreamFactory(TupleStream tupleStream) {
    StreamContext streamContext = new StreamContext();
    streamContext.setStreamFactory(streamFactory);
    tupleStream.setStreamContext(streamContext);
  }

  private ParallelStream parallelStream(TupleStream stream, FieldComparator comparator)
      throws IOException {
    return new ParallelStream(zkHost, COLLECTIONORALIAS, stream, numWorkers, comparator);
  }

  public void testCloudSolrStreamWithoutStreamContext() throws Exception {
    SolrParams sParams = params("q", "*:*", "fl", "id", "sort", "id asc");
    try (CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParams)) {
      stream.open();
    }
  }

  @Test
  public void testCloudStreamClientCache() throws Exception {

    StreamContext streamContext = new StreamContext();
    SolrClientCache solrClientCache = new SolrClientCache();
    solrClientCache.getCloudSolrClient(zkHost);
    streamContext.setSolrClientCache(solrClientCache);

    String expr =
        "search("
            + MULTI_REPLICA_COLLECTIONORALIAS
            + ",q=*:*,fl=\"a_i\", qt=\"/export\", sort=\"a_i asc\")";
    try (CloudSolrStream stream =
        new CloudSolrStream(StreamExpressionParser.parse(expr), streamFactory)) {
      stream.setStreamContext(streamContext);
      stream.open();
      Tuple t = stream.read();
      while (!t.EOF) {
        // no-op ... just want to iterate over the tuples
        t = stream.read();
      }

      List<String> baseUrls = new ArrayList<>();
      ZkStateReader zkStateReader = cluster.getZkStateReader();
      List<String> resolved =
          zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
      Set<String> liveNodes = zkStateReader.getClusterState().getLiveNodes();
      int expectedNumStreams = 0;
      for (String coll : resolved) {
        DocCollection dcoll = zkStateReader.getCollection(coll);
        for (Slice slice : dcoll.getSlices()) {
          ++expectedNumStreams; // one Stream per slice
          for (Replica r : slice.getReplicas()) {
            if (r.isActive(liveNodes)) {
              baseUrls.add(r.getBaseUrl());
            }
          }
        }
      }
      List<TupleStream> solrStreams = stream.children();
      assertEquals(expectedNumStreams, solrStreams.size());
      for (TupleStream next : solrStreams) {
        SolrStream ss = (SolrStream) next;
        // SolrStream uses the baseUrl of the replica and not the coreUrl
        assertTrue(baseUrls.contains(ss.getBaseUrl()));
      }

      // verify core filtering
      streamContext.setLocal(true);

      for (String coll : resolved) {
        Replica rr = zkStateReader.getCollection(coll).getReplicas().get(0);
        streamContext.put("core", rr.core);
        List<Replica> replicas =
            TupleStream.getReplicas(zkHost, coll, streamContext, new ModifiableSolrParams());
        assertEquals(
            "core filter for " + rr.core + " not applied for " + coll + "; replicas: " + replicas,
            1,
            replicas.size());
        assertEquals(
            "core filter for " + rr.core + " not applied for " + coll, rr, replicas.get(0));
      }

    } finally {
      solrClientCache.close();
    }
  }
}
